package com.at.sink10;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.connector.datagen.source.GeneratorFunction;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;

import java.time.Duration;
import java.time.ZoneId;

/**
 * @author huangchao E-mail:fengquan8866@163.com
 * @version 创建时间：2024/9/25 19:11
 */
public class SinkFile1 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // TODO 每个目录中，都会有并行度 个数文件写入
        env.setParallelism(2);

        // 必须开启checkpoint，否则一致都是 .inprogress
        env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);

        DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(
                new GeneratorFunction<Long, String>() {
                    @Override
                    public String map(Long aLong) throws Exception {
                        return "Number:" + aLong;
                    }
                },
                Long.MAX_VALUE,
                RateLimiterStrategy.perSecond(500),
                Types.STRING
        );

        DataStreamSource<String> dataGen = env
                .fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "date-generator");

        // TODO 数出到文件系统
        FileSink<String> fileSink = FileSink
                // 输出行式存储文件，指定路径、编码
                .<String>forRowFormat(new Path("D:/tmp/flink"), new SimpleStringEncoder<>("UTF-8"))
                // 数仓文件的一些配置：文件名的前缀、后缀
                .withOutputFileConfig(
                        OutputFileConfig.builder()
                                .withPartPrefix("at-")
                                .withPartSuffix(".log")
                                .build()
                )
                // 按照目录分桶：  如下每个小时1个
                .withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd HH", ZoneId.systemDefault()))
                // 文件滚动策略: 1分钟 或 1M
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(Duration.ofMinutes(1))
                                .withMaxPartSize(new MemorySize(1024 * 1024))
                                .build()
                )
                .build();
        dataGen.sinkTo(fileSink);

        env.execute();
    }
}
